/*
 * Copyright 2021 Shulie Technology, Co.Ltd
 * Email: shulie@shulie.io
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.shulie.surge.data.runtime.disruptor.dsl;

import io.shulie.surge.data.runtime.disruptor.*;

import java.util.*;

/**
 * Provides a repository mechanism to associate {@link EventHandler}s with {@link EventProcessor}s
 *
 * @param T the type of the {@link EventHandler}
 */
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
	private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
	private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>();
	private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();

	public void add(final EventProcessor eventprocessor,
			final EventHandler<T> handler,
			final SequenceBarrier barrier)
	{
		final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
		eventProcessorInfoByEventHandler.put(handler, consumerInfo);
		eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
		consumerInfos.add(consumerInfo);
	}

	public void add(final EventProcessor processor)
	{
		final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(processor, null, null);
		eventProcessorInfoBySequence.put(processor.getSequence(), consumerInfo);
		consumerInfos.add(consumerInfo);
	}

	public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier)
	{
		final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<T>(workerPool, sequenceBarrier);
		consumerInfos.add(workerPoolInfo);
		for (Sequence sequence : workerPool.getWorkerSequences())
		{
			eventProcessorInfoBySequence.put(sequence, workerPoolInfo);
		}
	}

	public Sequence[] getLastSequenceInChain(boolean includeStopped)
	{
		List<Sequence> lastSequence = new ArrayList<Sequence>();
		for (ConsumerInfo consumerInfo : consumerInfos)
		{
			if ((includeStopped || consumerInfo.isRunning()) && consumerInfo.isEndOfChain())
			{
				final Sequence[] sequences = consumerInfo.getSequences();
				Collections.addAll(lastSequence, sequences);
			}
		}

		return lastSequence.toArray(new Sequence[lastSequence.size()]);
	}

	public EventProcessor getEventProcessorFor(final EventHandler<T> handler)
	{
		final EventProcessorInfo<?> eventprocessorInfo = getEventProcessorInfo(handler);
		if (eventprocessorInfo == null)
		{
			throw new IllegalArgumentException("The event handler " + handler + " is not processing events.");
		}

		return eventprocessorInfo.getEventProcessor();
	}

	public Sequence getSequenceFor(final EventHandler<T> handler)
	{
		return getEventProcessorFor(handler).getSequence();
	}

	public void unMarkEventProcessorsAsEndOfChain(final Sequence... barrierEventProcessors)
	{
		for (Sequence barrierEventProcessor : barrierEventProcessors)
		{
			getEventProcessorInfo(barrierEventProcessor).markAsUsedInBarrier();
		}
	}

	@Override
	public Iterator<ConsumerInfo> iterator()
	{
		return consumerInfos.iterator();
	}

	public SequenceBarrier getBarrierFor(final EventHandler<T> handler)
	{
		final ConsumerInfo consumerInfo = getEventProcessorInfo(handler);
		return consumerInfo != null ? consumerInfo.getBarrier() : null;
	}

	private EventProcessorInfo<T> getEventProcessorInfo(final EventHandler<T> handler)
	{
		return eventProcessorInfoByEventHandler.get(handler);
	}

	private ConsumerInfo getEventProcessorInfo(final Sequence barrierEventProcessor)
	{
		return eventProcessorInfoBySequence.get(barrierEventProcessor);
	}
}
